/*
  Copyright (c) 2020, 2021, Oracle and/or its affiliates.

  This software is dual-licensed to you under the Universal Permissive License
  (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl or Apache License
  2.0 as shown at http://www.apache.org/licenses/LICENSE-2.0. You may choose
  either license.

  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

     https://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
*/

package oracle.r2dbc.impl;

import io.r2dbc.spi.OutParameters;
import io.r2dbc.spi.R2dbcException;
import io.r2dbc.spi.Readable;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.RowMetadata;
import oracle.r2dbc.OracleR2dbcWarning;
import oracle.r2dbc.impl.ReadablesMetadata.RowMetadataImpl;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.sql.BatchUpdateException;
import java.sql.ResultSet;
import java.sql.SQLWarning;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.LongStream;
import java.util.stream.Stream;

import static oracle.r2dbc.impl.OracleR2dbcExceptions.fromJdbc;
import static oracle.r2dbc.impl.OracleR2dbcExceptions.requireNonNull;
import static oracle.r2dbc.impl.OracleR2dbcExceptions.toR2dbcException;
import static oracle.r2dbc.impl.OracleReadableImpl.createRow;
import static oracle.r2dbc.impl.ReadablesMetadata.createRowMetadata;

/**
 * <p>
 * Abstract class providing a base implementation of the R2DBC SPI
 * {@link Result} interface. Concrete subclasses implement
 * {@link #mapSegments(Class, Function)} to return a {@link Publisher} that
 * emits the output of a {@link Segment} mapping function for each
 * {@code Segment} of the {@code Result}. Implementations of R2DBC SPI methods
 * in the base class invoke {@code mapSegments} with a mapping function that
 * filters the emitted {@code Segment}s according to the specification of the
 * SPI method.
 * </p>
 */
abstract class OracleResultImpl implements Result {

  /**
   * Indicates if a method call on this {@code Result} has already returned a
   * {@code Publisher} that allows this {@code Result} to be consumed. In
   * conformance with the R2DBC SPI, an {@code IllegalStateException} is thrown
   * if multiple attempts are made to consume this result; A result may only be
   * consumed once.
   */
  private boolean isPublished = false;

  /** Private constructor invoked by inner subclasses */
  private OracleResultImpl() { }

  /**
   * <p>
   * Returns a publisher that emits the output of a segment mapping function for
   * each segment of this result. The mapping function only accepts segments of
   * a specified type. This method is called from methods of the public API to
   * create publishers of different value types. For instance, the
   * {@link #getRowsUpdated()} method creates a {@code Publisher<Long>} by
   * calling this method with an {@code UpdateCount.class} segment type and a
   * function that maps {@code UpdateCount} segments to a {@code Long}.
   * </p><p>
   * Any segments, other than {@code Message} segments, that are not an instance
   * of the {@code segmentType} should not be passed to the
   * {@code segmentMapper}, and should not emitted by the returned publisher in
   * any form. However, {@code Message} segments are an exception. The error of
   * a {@code Message} segment must be emitted as an {@code onError} signal,
   * even if the {@code segmentType} is not assignable to {@code Message}
   * segments.
   * </p>
   * @param segmentType Class of {@code Segment} to map. Not null.
   * @param segmentMapper Maps segments to published values.
   * @return A publisher that emits the mapped values.
   * @param <T> Segment type to map
   * @param <U> Type of mapped value
   */
  protected abstract <T extends Segment, U> Publisher<U> mapSegments(
    Class<T> segmentType, Function<? super T, U> segmentMapper);

  /**
   * {@inheritDoc}
   * <p>
   * Implements the R2DBC SPI method to return a flat-mapping of publishers
   * generated by a {@code mappingFunction}. Publishers output by the
   * {@code mappingFunction} are subscribed to serially. Serial subscription is
   * implemented by the {@code Flux.concat(Publisher<Publisher>)} factory called
   * by this method.
   * </p><p>
   * The returned {@code Publisher} does not support multiple
   * {@code Subscriber}s
   * </p>
   */
  @Override
  public <T> Publisher<T> flatMap(
    Function<Segment, ? extends Publisher<? extends T>> mappingFunction) {
    requireNonNull(mappingFunction, "mappingFunction is null");
    setPublished();
    return singleSubscriber(Flux.concat(
      mapSegments(Segment.class, mappingFunction)));
  }

  /**
   * {@inheritDoc}
   * <p>
   * Implements the R2DBC SPI method to return a {@code Publisher} emitting the
   * update counts of this {@code Result} as {@link Integer} values. An
   * {@code onError} signal with {@link ArithmeticException} is emitted if a
   * update count of this {@code Result} is larger than
   * {@link Integer#MAX_VALUE}.
   * </p><p>
   * The returned {@code Publisher} supports multiple {@code Subscriber}s.
   * </p>
   */
  @Override
  public Publisher<Long> getRowsUpdated() {
    setPublished();
    return mapSegments(UpdateCount.class, UpdateCount::value);
  }

  /**
   * {@inheritDoc}
   * <p>
   * Implements the R2DBC SPI method to return a {@code Publisher} emitting the
   * output of a {@code mappingFunction} for each {@link Row} of this
   * {@code Result}.
   * </p><p>
   * The returned {@code Publisher} does not support multiple
   * {@code Subscriber}s.
   * </p>
   */
  @Override
  public <T> Publisher<T> map(
    BiFunction<Row, RowMetadata, ? extends T> mappingFunction) {
    requireNonNull(mappingFunction, "mappingFunction is null");
    setPublished();
    return singleSubscriber(mapSegments(
      RowSegment.class, rowSegment -> {
        Row row = rowSegment.row();
        return mappingFunction.apply(row, row.getMetadata());
      }));
  }

  /**
   * {@inheritDoc}
   * <p>
   * Implements the R2DBC SPI method to return a {@code Publisher} emitting the
   * output of a {@code mappingFunction} for each {@link Row} of this
   * {@code Result}.
   * </p><p>
   * The returned {@code Publisher} does not support multiple
   * {@code Subscriber}s.
   * </p>
   */
  @Override
  public <T> Publisher<T> map(
    Function<? super Readable, ? extends T> mappingFunction) {
    requireNonNull(mappingFunction, "mappingFunction is null");
    setPublished();
    return singleSubscriber(mapSegments(
      ReadableSegment.class, readableSegment ->
        mappingFunction.apply(readableSegment.getReadable())));
  }

  /**
   * {@inheritDoc}
   * <p>
   * Implements the R2DBC SPI method to return a new instance of
   * {@code OracleResultImpl} that implements
   * {@link OracleResultImpl#mapSegments(Class, Function)} to filter segments of
   * this result with the specified {@code filter} predicate.
   * </p>
   */
  @Override
  public OracleResultImpl filter(Predicate<Segment> filter) {
    requireNonNull(filter, "filter is null");
    return new FilteredResult(this, filter);
  }

  /**
   * <p>
   * Adds this result to a collection of results that depend on a JDBC
   * statement. After this method is called, the JDBC statement must remain open
   * until this result signals that it has been closed.
   * </p><p>
   * This method must only be invoked when it is certain that this
   * result will be received by user code. If user code never receives this
   * result, then it can never consume it, and the JDBC statement is never
   * closed. Otherwise, once this result reaches user code, that code is
   * responsible for consuming it. The R2DBC specification requires results to
   * be fully consumed; There is no other way for Oracle R2DBC to know when
   * it is safe to close the JDBC statement.
   * </p><p>
   * Depending on the type of this result, this method may or may not actually
   * do anything. For instance, if this result is an update count, then it
   * doesn't depend on a JDBC statement, and so it won't actually register
   * itself as a dependent. This result registers itself only if it retains
   * something like {@code ResultSet} which depends on the JDBC statement to
   * remain open.
   * </p><p>
   * Additional results may be added to the collection after this method
   * returns. In particular, a REF CURSOR is backed by a ResultSet, and that
   * ResultSet will be closed when the JDBC statement is closed. At the time
   * when this method is called, it is not known what the user defined mapping
   * function will do. A check to see if REF CURSOR ResultSet was created can
   * only after the user defined function has executed.
   * </p><p>
   * This method is implemented by the OracleResultImpl super class to do
   * nothing. Subclasses that depend on the JDBC statement override this method
   * and add themselves to the collection of dependent results.
   * </p>
   */
  void addDependent() {
    // Do nothing for non-dependent results. This method is overridden by the
    // DependentResult subclass to add a dependent result.
  }

  /**
   * Marks this result as having created a {@code Publisher} that allows this
   * {@code Result} to be consumed. This method enforces the {@link Result} SPI
   * contract which does not allow the same result to be consumed more than
   * once.
   * <em>
   *   This method MUST be called before returning a Publisher to user code from
   *   all public APIs of this class.
   * </em>
   * @throws IllegalStateException If this result has already been consumed.
   */
  protected void setPublished() {
    if (isPublished) {
      throw new IllegalStateException(
        "A result can not be consumed more than once");
    }
    isPublished = true;
  }

  /**
   * Creates a {@code Result} that publishes a JDBC {@code resultSet} as
   * {@link RowSegment}s
   * @param r2dbcConnection The R2DBC connection that created this result. Not
   * null.
   * @param dependentCounter Collection of results that depend on the JDBC
   * statement which created the {@code ResultSet} to remain open until all
   * results are consumed.
   * @param resultSet {@code ResultSet} to publish. Not null.
   * @return A {@code Result} for a ResultSet
   */
  public static OracleResultImpl createQueryResult(
    OracleConnectionImpl r2dbcConnection,
    DependentCounter dependentCounter, ResultSet resultSet) {
    return new ResultSetResult(r2dbcConnection, dependentCounter, resultSet);
  }

  /**
   * Creates a {@code Result} that publishes {@code outParameters} as
   * {@link OutSegment}s
   * @param dependentCounter Collection of results that depend on the JDBC
   * statement which created the {@code OutParameters} to remain open until all
   * results are consumed.
   * @param outParameters {@code OutParameters} to publish. Not null.
   * @param adapter Adapts JDBC calls into reactive streams. Not null.
   * @return A {@code Result} for {@code OutParameters}
   */
  static OracleResultImpl createCallResult(
    DependentCounter dependentCounter, OutParameters outParameters,
    ReactiveJdbcAdapter adapter) {
    return new CallResult(dependentCounter, outParameters, adapter);
  }

  /**
   * Creates a {@code Result} that publishes an {@code updateCount} as an
   * {@link UpdateCount} segment, followed by a {@code generatedKeys}
   * {@code ResultSet} as {@link RowSegment}s
   * @return A {@code Result} for values generated by DML
   * @param r2dbcConnection The R2DBC connection that created this result. Not
   * null.
   * @param updateCount Update count to publish
   * @param dependentCounter Collection of results that depend on the JDBC
   * statement which created the {@code generatedKeys} {@code ResultSet} to
   * remain open until all results are consumed.
   * @param generatedKeys Generated values to publish. Not null.
   */
  static OracleResultImpl createGeneratedValuesResult(
    OracleConnectionImpl r2dbcConnection, long updateCount,
    DependentCounter dependentCounter, ResultSet generatedKeys) {
    return new GeneratedKeysResult(
      r2dbcConnection, updateCount, dependentCounter, generatedKeys);
  }

  /**
   * Creates a {@code Result} that publishes an {@code updateCount} as an
   * {@link UpdateCount} segment
   * @return A {@code Result} for a DML update
   * @param updateCount Update count to publish
   */
  static OracleResultImpl createUpdateCountResult(long updateCount) {
    return new UpdateCountResult(updateCount);
  }

  /**
   * Creates a {@code Result} that publishes a batch of {@code updateCounts}
   * as {@link UpdateCount} segments
   * @return A {@code Result} for a batch DML update
   * @param updateCounts Update counts to publish. Not null.
   */
  static OracleResultImpl createBatchUpdateResult(long[] updateCounts) {
    return new BatchUpdateResult(updateCounts);
  }

  /**
   * Creates a {@code Result} that publishes update counts of a
   * {@code batchUpdateException} as {@link UpdateCount} segments, followed a
   * {@link Message} segment with the {@code batchUpdateException} mapped to
   * an {@link R2dbcException}
   * @param batchUpdateException BatchUpdateException to publish. Not null.
   * @return A {@code Result} for a failed DML batch update
   */
  static OracleResultImpl createBatchUpdateErrorResult(
    BatchUpdateException batchUpdateException) {
    return new BatchUpdateErrorResult(batchUpdateException);
  }

  /**
   * Creates a {@code Result} that publishes an {@code r2dbcException} as a
   * {@link Message} segment
   * @param r2dbcException Error to publish. Not null.
   * @return A {@code Result} for failed {@code Statement} execution
   */
  static OracleResultImpl createErrorResult(R2dbcException r2dbcException) {
    return new ErrorResult(r2dbcException);
  }

  /**
   * Creates a {@code Result} that publishes a {@code warning} as a
   * {@link Message} segment, followed by any {@code Segment}s of a
   * {@code result}.
   * @param sql The SQL that resulted in a waring. Not null.
   * @param warning Warning to publish. Not null.
   * @param result Result to publisher. Not null.
   * @return A {@code Result} for a {@code Statement} execution that
   * completed with a warning.
   */
  static OracleResultImpl createWarningResult(
    String sql, SQLWarning warning, OracleResultImpl result) {
    return new WarningResult(sql, warning, result);
  }

  /**
   * {@link OracleResultImpl} subclass that publishes a single update count. An
   * instance of this class constructed with negative valued update count
   * will publish no {@code Segment}s
   */
  private static final class UpdateCountResult extends OracleResultImpl {

    private final long updateCount;

    private UpdateCountResult(long updateCount) {
      this.updateCount = updateCount;
    }

    /**
     * {@inheritDoc}
     * <p>
     * This method uses Mono's fromSupplier factory to defer segment mapping
     * until the publisher is subscribed to. This ensures that segments are
     * consumed in the correct order when the returned publisher is concatenated
     * after another, as with
     * {@link BatchUpdateErrorResult#mapSegments(Class, Function)}, for
     * instance. Additionally, the factory handles any exception thrown by the
     * segment mapper by translating it in to an onError signal.
     * </p>
     */
    @Override
    protected <T extends Segment, U> Publisher<U> mapSegments(
      Class<T> segmentType, Function<? super T, U> segmentMapper) {

      if (!segmentType.isAssignableFrom(UpdateCountImpl.class))
        return Mono.empty();

      return Mono.fromSupplier(() ->
        segmentMapper.apply(segmentType.cast(
          new UpdateCountImpl(updateCount))));
    }
  }

  /**
   * <p>
   * {@link OracleResultImpl} subclass that publishes JDBC {@link ResultSet} as
   * {@link RowSegment}s. {@link RowMetadata} of published {@code Rows} is
   * derived from the {@link java.sql.ResultSetMetaData} of the
   * {@link ResultSet}.
   * </p><p>
   * This {@code Result} is <i>not</i> implemented to publish
   * {@link SQLWarning} chains returned by {@link ResultSet#getWarnings()} as
   * {@link Message} segments. This implementation is correct for the 21.1
   * Oracle JDBC Driver which is known to implement {@code getWarnings()} by
   * returning {@code null} for forward-only insensitive {@code ResultSets}
   * when no invocation of {@link java.sql.Statement#setMaxRows(int)}
   * or {@link java.sql.Statement#setLargeMaxRows(long)} has occurred.
   * </p><p>
   * It is a known limitation of the 21.1 Oracle JDBC Driver that
   * {@link ResultSet#getWarnings()} can not be invoked after row publishing
   * has been initiated; The {@code ResultSet} is logically closed once row
   * publishing has been initiated, and so {@code getWarnings} would throw a
   * {@link java.sql.SQLException} to indicate a closed {@code ResultSet}. If
   * a later release of Oracle JDBC removes this limitation, then this
   * {@code Result} should be implemented to invoke {@code getWarnings} to
   * ensure correctness if a later release of Oracle JDBC also returns non-null
   * values from that method.
   * </p>
   */
  private static final class ResultSetResult extends DependentResult {

    private final OracleConnectionImpl r2dbcConnection;
    private final ResultSet resultSet;
    private final RowMetadataImpl metadata;
    private final ReactiveJdbcAdapter adapter;

    private ResultSetResult(
      OracleConnectionImpl r2dbcConnection,
      DependentCounter dependentCounter,
      ResultSet resultSet) {
      super(dependentCounter);
      this.r2dbcConnection = r2dbcConnection;
      this.resultSet = resultSet;
      this.metadata = createRowMetadata(fromJdbc(resultSet::getMetaData));
      this.adapter = r2dbcConnection.adapter();
    }

    @Override
    protected <T extends Segment, U> Publisher<U> mapDependentSegments(
      Class<T> segmentType, Function<? super T, U> segmentMapper) {

      if (!segmentType.isAssignableFrom(RowSegmentImpl.class))
        return Mono.empty();

      // Avoiding object allocation by reusing the same Row object
      ReusableJdbcReadable reusableJdbcReadable = new ReusableJdbcReadable();
      Row row = createRow(
        r2dbcConnection, dependentCounter, reusableJdbcReadable, metadata);

      return adapter.publishRows(resultSet, jdbcReadable -> {
        reusableJdbcReadable.current = jdbcReadable;
        return segmentMapper.apply(segmentType.cast(new RowSegmentImpl(row)));
      });
    }

    /**
     * Wraps an actual
     * {@link oracle.r2dbc.impl.ReactiveJdbcAdapter.JdbcReadable}. The actual
     * readable is set to {@link #current}. A single instance of
     * {@code OracleReadableImpl.RowImpl} can retain an instance of this class,
     * and the instance can read multiple rows by changing the value of
     * {@link #current} between invocations of a user defined row mapping
     * function. This is done to avoid allocating an object for each row of a
     * query result.
     */
    private static final class ReusableJdbcReadable
      implements ReactiveJdbcAdapter.JdbcReadable {

      ReactiveJdbcAdapter.JdbcReadable current = null;

      @Override
      public <T> T getObject(int index, Class<T> type) {
        return current.getObject(index, type);
      }
    }
  }

  /**
   * {@link OracleResultImpl} subclass that publishes an update count as an
   * {@link UpdateCount} segment, followed by a JDBC {@link ResultSet} as
   * {@link RowSegment}s. This class is a composite of a
   * {@link UpdateCountResult} and {@link ResultSetResult}.
   */
  private static final class GeneratedKeysResult extends OracleResultImpl {

    private final OracleResultImpl updateCountResult;
    private final OracleResultImpl generatedKeysResult;

    private GeneratedKeysResult(
      OracleConnectionImpl r2dbcConnection,
      long updateCount, DependentCounter dependentCounter,
      ResultSet generatedKeys) {
      updateCountResult = createUpdateCountResult(updateCount);
      generatedKeysResult =
        createQueryResult(r2dbcConnection, dependentCounter, generatedKeys);
    }

    @Override
    void addDependent() {
      generatedKeysResult.addDependent();
    }

    @Override
    protected <T extends Segment, U> Publisher<U> mapSegments(
      Class<T> segmentType, Function<? super T, U> segmentMapper) {
      return Flux.concat(
        updateCountResult.mapSegments(segmentType, segmentMapper),
        generatedKeysResult.mapSegments(segmentType, segmentMapper));
    }
  }

  /**
   * {@link OracleResultImpl} subclass that publishes a single instance of
   * {@link OutParameters} as an {@link OutSegment}.
   */
  private static final class CallResult extends DependentResult {

    private final OutParameters outParameters;
    private final ReactiveJdbcAdapter adapter;

    private CallResult(
      DependentCounter dependentCounter, OutParameters outParameters,
      ReactiveJdbcAdapter adapter) {
      super(dependentCounter);
      this.outParameters = outParameters;
      this.adapter = adapter;
    }

    @Override
    protected <T extends Segment, U> Publisher<U> mapDependentSegments(
      Class<T> segmentType, Function<? super T, U> segmentMapper) {

      if (!segmentType.isAssignableFrom(OutSegmentImpl.class))
        return Mono.empty();

      // Acquire the JDBC lock asynchronously as the outParameters are backed
      // by a JDBC CallableStatement, and it may block a thread when values
      // are accessed with CallableStatement.getObject(...)
      return adapter.getLock().get(() ->
        segmentMapper.apply(segmentType.cast(
          new OutSegmentImpl(outParameters))));
    }
  }

  /**
   * {@link OracleResultImpl} subclass that publishes an array of update
   * counts as {@link UpdateCount} segments.
   */
  private static final class BatchUpdateResult extends OracleResultImpl {

    private final long[] updateCounts;

    private BatchUpdateResult(long[] updateCounts) {
      this.updateCounts = updateCounts;
    }

    @Override
    protected <T extends Segment, U> Publisher<U> mapSegments(
      Class<T> segmentType, Function<? super T, U> segmentMapper) {

      if (!segmentType.isAssignableFrom(UpdateCountImpl.class))
        return Mono.empty();

      return Flux.fromStream(
        LongStream.of(updateCounts)
          .mapToObj(updateCount ->
            segmentMapper.apply(segmentType.cast(
              new UpdateCountImpl(updateCount)))));
    }
  }

  /**
   * {@link OracleResultImpl} subclass that publishes an array of update
   * counts from a {@link BatchUpdateException} as {@link UpdateCount} segments
   * followed by a {@link Message} segment with the {@link BatchUpdateException}
   * mapped to an {@link R2dbcException}. This class is a composite of
   * {@link BatchUpdateResult} and {@link ErrorResult}.
   */
  private static final class BatchUpdateErrorResult extends OracleResultImpl {

    private final BatchUpdateResult batchUpdateResult;
    private final ErrorResult errorResult;

    private BatchUpdateErrorResult(BatchUpdateException batchUpdateException) {
      batchUpdateResult = new BatchUpdateResult(
        batchUpdateException.getLargeUpdateCounts());
      errorResult =
        new ErrorResult(toR2dbcException(batchUpdateException));
    }

    @Override
    protected <T extends Segment, U> Publisher<U> mapSegments(
      Class<T> segmentType, Function<? super T, U> segmentMapper) {
      return Flux.concat(
        batchUpdateResult.mapSegments(segmentType, segmentMapper),
        errorResult.mapSegments(segmentType, segmentMapper));
    }

  }

  /**
   * {@link OracleResultImpl} subclass that publishes an {@link R2dbcException}
   * as a {@link Message} segment.
   */
  private static final class ErrorResult extends OracleResultImpl {

    private final R2dbcException r2dbcException;

    private ErrorResult(R2dbcException r2dbcException) {
      this.r2dbcException = r2dbcException;
    }

    /**
     * {@inheritDoc}
     * <p>
     * Emits the mapping of a message segment, or emits an error if another
     * segment type is specified. Unlike other segment types, message segments
     * represent an error that must be delivered to user code. Even when user
     * code is calling for some other segment type, like rows with
     * {@link #map(BiFunction)}, or update counts with
     * {@link #getRowsUpdated()}, user code does not want these calls to ignore
     * error. If user code really does want to ignore errors, it may call
     * {@link #filter(Predicate)} to ignore message segments, or
     * {@link #flatMap(Function)} to recover from message segments.
     * </p><p>
     * This method uses Mono's fromSupplier factory to defer segment mapping
     * until the publisher is subscribed to. This ensures that segments are
     * consumed in the correct order when the returned publisher is concatenated
     * after another, as with
     * {@link BatchUpdateErrorResult#mapSegments(Class, Function)}, for
     * instance. Additionally, the factory handles any exception thrown by the
     * segment mapper by translating it in to an onError signal.
     * </p>
     */
    @Override
    protected <T extends Segment, U> Publisher<U> mapSegments(
      Class<T> segmentType, Function<? super T, U> segmentMapper) {

      if (! segmentType.isAssignableFrom(MessageImpl.class))
        return Mono.error(r2dbcException);

      return Mono.fromSupplier(() ->
        segmentMapper.apply(segmentType.cast(
          new MessageImpl(r2dbcException))));
    }
  }

  /**
   * {@link OracleResultImpl} subclass that publishes a {@link SQLWarning}
   * chain as {@link Message} segments, followed by the segments of another
   * {@link OracleResultImpl}.
   */
  private static final class WarningResult extends OracleResultImpl {

    /** The SQL that resulted in a warning */
    private final String sql;

    /** The warning of this result */
    private final SQLWarning warning;

    /** The result that follows this result */
    private final OracleResultImpl result;

    /**
     * Constructs a result that publishes a {@code warning} as a
     * {@link Message}, and then publishes the segments of a {@code result}.
     * @param sql The SQL that resulted in a warning
     * @param warning Warning to publish. Not null.
     * @param result Result of segments to publish after the warning. Not null.
     */
    private WarningResult(
      String sql, SQLWarning warning, OracleResultImpl result) {
      this.sql = sql;
      this.warning = warning;
      this.result = result;
    }

    @Override
    void addDependent() {
      result.addDependent();
    }

    /**
     * @implNote In the 1.0.0 release, message segments for the warning were
     * emitted prior to any segments from the {@link #result}. Unless message
     * segments were consumed by {@link #flatMap(Function)}, the publisher
     * returned to user code would emit onError before emitting values from
     * the {@link #result}.
     * Revisiting this decision before the 1.1.0 release, it really seems like a
     * bad one. It is thought that user code would typically want to consume
     * results before handling warnings and errors, and so the order is reversed
     * in later releases. Segments are now emitted from the {@link #result}
     * first, followed by the message segments. This change in behavior should
     * be safe, as the R2DBC SPI does not specify any ordering for this case.
     */
    @Override
    protected <T extends Segment, U> Publisher<U> mapSegments(
      Class<T> segmentType, Function<? super T, U> segmentMapper) {
      return Flux.concat(
        result != null
          ? result.mapSegments(segmentType, segmentMapper)
          : Mono.empty(),
        segmentType.isAssignableFrom(WarningImpl.class)
          ? Flux.fromStream(Stream.iterate(
              warning, Objects::nonNull, SQLWarning::getNextWarning)
            .map(nextWarning ->
              segmentMapper.apply(segmentType.cast(
                new WarningImpl(toR2dbcException(nextWarning, sql))))))
          // Do not emit warnings unless flatMap or filter will consume them
          : Mono.empty());
    }
  }

  /**
   * A result that filters out {@code Segment} of another result. Filtered
   * segments are emitted as the {@link #FILTERED} object.
   */
  private static final class FilteredResult extends OracleResultImpl {

    /**
     * An object that represents a filtered {@code Segment}. This object is
     * output by a segment mapping function defined in
     * {@link #mapSegments(Class, Function)}.
     */
    private static final Object FILTERED = new Object();

    /** Result of segments to publish after applying the {@link #filter} */
    private final OracleResultImpl result;

    /** Returns {@code false} for segments that should be filtered */
    private final Predicate<Segment> filter;

    /**
     * Constructs a new result that applies a {@code filter} when publishing
     * segments of a {@code result}.
     */
    private FilteredResult(OracleResultImpl result, Predicate<Segment> filter) {
      this.result = result;
      this.filter = filter;
    }

    @Override
    void addDependent() {
      result.addDependent();
    }

    /**
     * {@inheritDoc}
     * <p>
     * Passes {@code Segment.class} to the {@code mapSegments} method of the
     * filtered {@link #result} to map all segments with the filtering
     * predicate. Mapping functions must return a non-null value, so it will
     * return a dummy object, {@link #FILTERED}, for segments that are filtered
     * by the predicate. The {@code FILTERED} object is then filtered by a
     * downstream filter operator.
     * </p><p>
     * It is important that {@code Segment.class} be passed to the
     * {@code mapSegments} method of the filtered result, otherwise
     * {@code Message} segments would be emitted with {@code onError} and bypass
     * the filtering function. If the filter does not exclude a message segment,
     * and the {@code segmentMapper} does not accept message segments, only then
     * will the exception of the message segment be emitted with onError.
     * </p>
     */
    @Override
    protected <T extends Segment, U> Publisher<U> mapSegments(
      Class<T> segmentType, Function<? super T, U> segmentMapper) {

      @SuppressWarnings("unchecked")
      U filtered = (U)FILTERED;

      return Flux.from(result.mapSegments(
        Segment.class, segment -> {
          if (!filter.test(segment))
            return filtered;

          if (segmentType.isAssignableFrom(segment.getClass()))
            return segmentMapper.apply(segmentType.cast(segment));
          else if (segment instanceof Message)
            throw ((Message)segment).exception();
          else
            return filtered;
        }))
        .filter(next -> next != FILTERED);
    }

    /**
     * {@inheritDoc}
     * <p>
     * Overridden to also set the filtered result as published. If this method
     * is called, then a method of the public API has been called to return
     * a publisher from this result. If user code somehow has a reference to the
     * filtered result as well, then the filtered result should also throw
     * {@code IllegalStateException} if one of its public methods are invoked.
     * </p>
     */
    @Override
    protected void setPublished() {
      result.setPublished();
      super.setPublished();
    }
  }

  /**
   * Common interface for instances of {@link Segment} with a {@link Readable}
   * value. The {@link #map(Function)} filters for this segment type, and uses
   * the common {@link #getReadable()} method to obtain a {@link Readable} from
   * the segment.
   */
  private interface ReadableSegment extends Segment {
    /** Returns the {@link Readable} value of this {@code Segment} */
    Readable getReadable();
  }

  /**
   * Implementation of {@link RowSegment}. An instance of this class
   * implements the {@link ReadableSegment} interface that satisfies the filter
   * of {@link #map(Function)}.
   */
  private static final class RowSegmentImpl
    implements RowSegment, ReadableSegment {

    private final Row row;

    private RowSegmentImpl(Row row) {
      this.row = row;
    }

    @Override
    public Row row() {
      return row;
    }

    @Override
    public Readable getReadable() {
      return row;
    }
  }

  /**
   * Implementation of {@link OutSegment}. An instance of this class
   * implements the {@link ReadableSegment} interface that satisfies the filter
   * of {@link #map(Function)}.
   */
  private static final class OutSegmentImpl
    implements OutSegment, ReadableSegment {

    private final OutParameters outParameters;

    private OutSegmentImpl(OutParameters outParameters) {
      this.outParameters = outParameters;
    }

    @Override
    public OutParameters outParameters() {
      return outParameters;
    }

    @Override
    public Readable getReadable() {
      return outParameters;
    }
  }


  /**
   * A base class for results that depend on a JDBC statement to remain open
   * until the result is consumed. This base class handles interactions with
   * a {@link DependentCounter} object representing a collection of results
   * that depend on a JDBC statement. Subclasses implement
   * {@link #mapDependentSegments(Class, Function)} following the same
   * specification as {@link #mapSegments(Class, Function)}.
   */
  private static abstract class DependentResult extends OracleResultImpl {

    /**
     * A collection of results that depend on the JDBC statement which created
     * this result to remain open until all results are consumed.
     */
    protected final DependentCounter dependentCounter;

    /**
     * Constructs a new result that registers and deregisters itself with a
     * collection of dependent results.
     * @param dependentCounter Collection of dependent results. Not null.
     */
    private DependentResult(DependentCounter dependentCounter) {
      this.dependentCounter = dependentCounter;
    }

    /**
     * {@inheritDoc}
     * <p>
     * Adds this result to the collection of dependent results.
     * </p>
     */
    @Override
    void addDependent() {
      dependentCounter.increment();
    }

    /**
     * {@inheritDoc}
     * <p>
     * Delegates to the {@code mapDependentSegments(Class, Function)} method of
     * a subclass to perform actual segment mapping. This method ensures that
     * this result is removed from the collection of dependent results when the
     * segment mapping publisher terminates with {@code onComplete},
     * {@code onError}, or {@code cancel}.
     * </p>
     */
    @Override
    protected final <T extends Segment, U> Publisher<U> mapSegments(
      Class<T> segmentType, Function<? super T, U> segmentMapper) {

      Publisher<Void> removeDependent = dependentCounter.decrement();

      return Publishers.concatTerminal(
        mapDependentSegments(segmentType, segmentMapper),
        removeDependent);
    }

    /**
     * Maps segments exactly as specified by
     * {@link #mapSegments(Class, Function)}. This method is called from the
     * base class implementation of {@link #mapSegments(Class, Function)}. The
     * base class implementation ensures that this result is removed from the
     * collection of dependents when the segment mapping publisher is
     * terminated.
     * @param segmentType Class of {@code Segment} to map. Not null.
     * @param segmentMapper Maps segments to published values.
     * @return A publisher that emits the mapped values.
     * @param <T> Segment type to map
     * @param <U> Type of mapped value
     */
    protected abstract <T extends Segment, U> Publisher<U> mapDependentSegments(
      Class<T> segmentType, Function<? super T, U> segmentMapper);
  }

  /**
   * Implementation of {@link UpdateCount}.
   */
  private static final class UpdateCountImpl implements UpdateCount {

    private final long value;

    private UpdateCountImpl(long value) {
      this.value = value;
    }

    @Override
    public long value() {
      return value;
    }
  }

  /**
   * Implementation of {@link Message}.
   */
  private static class MessageImpl implements Message {

    private final R2dbcException exception;

    private MessageImpl(R2dbcException exception) {
      this.exception = exception;
    }

    @Override
    public R2dbcException exception() {
      return exception;
    }

    @Override
    public int errorCode() {
      return exception.getErrorCode();
    }

    @Override
    public String sqlState() {
      return exception.getSqlState();
    }

    @Override
    public String message() {
      return exception.getMessage();
    }

    @Override
    public String toString() {
      return exception.toString();
    }
  }

  /**
   * Implementation of {@link OracleR2dbcWarning}.
   */
  private static final class WarningImpl
    extends MessageImpl
    implements OracleR2dbcWarning {

    private WarningImpl(R2dbcException exception) {
      super(exception);
    }

  }

  /**
   * Returns a {@code Publisher} that emits the signals of a {@code publisher}
   * to a single {@link org.reactivestreams.Subscriber}, and rejects additional
   * {@code Subscriber}s by emitting {@code onError} with
   * {@link IllegalStateException}.
   * @param publisher Publisher that emits signals. Not null.
   * @param <T> Value type of {@code onNext} signals
   * @return A {@code Publisher} that allows a single subscriber
   */
  private static <T> Publisher<T> singleSubscriber(Publisher<T> publisher) {
    AtomicBoolean isSubscribed = new AtomicBoolean(false);
    return Flux.defer(() ->
      isSubscribed.compareAndSet(false, true)
        ? publisher
        : Mono.error(new IllegalStateException(
            "Publisher does not support multiple subscribers")));
  }
}
